Skip to main content

Async Performance Patterns

The One-Line Change That Costs 200ms Per Request

These two FastAPI endpoint implementations look nearly identical:

# Version A: shared client (correct)
import httpx
from contextlib import asynccontextmanager
from fastapi import FastAPI

_http_client: httpx.AsyncClient = None

@asynccontextmanager
async def lifespan(app: FastAPI):
global _http_client
_http_client = httpx.AsyncClient(
timeout=httpx.Timeout(10.0),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
)
yield
await _http_client.aclose()

app = FastAPI(lifespan=lifespan)

@app.get("/api/price/{ticker}")
async def get_price_v1(ticker: str):
response = await _http_client.get(f"https://api.prices.internal/{ticker}")
return response.json()


# Version B: new client per request (wrong)
@app.get("/api/price-v2/{ticker}")
async def get_price_v2(ticker: str):
async with httpx.AsyncClient() as client: # NEW CLIENT EVERY REQUEST
response = await client.get(f"https://api.prices.internal/{ticker}")
return response.json()

Under load at 500 req/s:

MetricVersion A (shared client)Version B (new client/req)
Median latency8ms218ms
p99 latency23ms891ms
Throughput500 req/s112 req/s
Error rate0%12% (connection timeouts)

The difference: TCP connection establishment. Every HTTP request in Version B negotiates a new TCP connection (and TLS handshake for HTTPS): approximately 2 round trips = 200ms on a cross-datacenter link. Version A reuses existing connections from a pool. 200ms × 500 req/s = the service cannot keep up.

This is the most common async performance mistake in production Python services. It is followed closely by a half-dozen others - all covered in this lesson.

What You Will Learn

  • Understand asyncio's event loop internals well enough to diagnose stalls
  • Drop in uvloop for a 2–4x event loop speedup
  • Configure and manage connection pools correctly for HTTP clients and databases
  • Use asyncio.Semaphore to cap concurrent outbound requests
  • Compare asyncio.gather, asyncio.TaskGroup, and sequential for correctness and performance
  • Implement backpressure with asyncio.Queue to prevent OOM under load spikes
  • Run CPU-bound and blocking I/O code correctly from async context
  • Profile and debug stalled asyncio tasks in production

Prerequisites

RequirementLevel Needed
Python async/await syntaxComfortable
asyncio.gather basicsFamiliar
HTTP and TCP fundamentalsHelpful
FastAPI or similar async frameworkHelpful

Section 1: asyncio Event Loop Internals

Understanding what the event loop actually does prevents a large class of async performance bugs.

The Event Loop Cycle

asyncio runs a single-threaded event loop. The core loop (simplified from CPython source) is:

_run_once():
1. Calculate the timeout for the next scheduled callback
2. Call epoll_wait() / kqueue() / IOCP with the timeout
→ This blocks the thread (but wakes on I/O or timeout)
3. Mark all I/O-ready file descriptors as ready
4. Execute all ready callbacks (zero timeout - don't block)
5. Execute any scheduled callbacks whose deadline has passed
6. Repeat forever

Visualised:

Event Loop Thread

├── epoll_wait(fd_set, timeout=0.003s)
│ ↑ blocks here until I/O ready or timeout

├── socket.read() ready → schedule http_response_callback
├── socket.write() ready → write next chunk

├── Execute callbacks:
│ http_response_callback() → resumes coroutine A
│ timer_callback() → resumes coroutine B

└── ← repeat ────────────────────────────────────────────

Critical implication: the event loop is single-threaded. A blocking call anywhere in a coroutine blocks ALL coroutines. There is no preemption - a coroutine runs until it explicitly yields control via await.

The Blocking Call Trap

import asyncio
import time

async def fetch_price(ticker: str) -> float:
await asyncio.sleep(0.001) # simulates network I/O
return 42.0

async def handle_request(request_id: int) -> None:
prices = await fetch_price("AAPL")

# BUG: time.sleep() blocks the ENTIRE event loop
# All other coroutines freeze for 0.5 seconds
time.sleep(0.5) # ← never do this in a coroutine

return {"id": request_id, "price": prices}

async def bad_service():
# 100 concurrent requests - but they all freeze when any hits time.sleep()
tasks = [handle_request(i) for i in range(100)]
await asyncio.gather(*tasks)

Other blocking calls that freeze the event loop:

  • requests.get() (synchronous HTTP)
  • open() followed by f.read() (synchronous file I/O)
  • time.sleep() (synchronous sleep)
  • CPU-heavy computation (no await - never yields)
  • subprocess.run() (synchronous subprocess)
  • Any socket.* call without await

Section 2: uvloop - Drop-In Event Loop Speedup

uvloop is a fast implementation of the asyncio event loop built on top of libuv (the same C library powering Node.js). It is a drop-in replacement requiring one line of code.

pip install uvloop
import asyncio
import uvloop

# Option 1: replace globally before any asyncio usage
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# Option 2: use as a runner (Python 3.11+)
async def main():
...

uvloop.run(main())

# Option 3: for a specific loop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()

With FastAPI / Uvicorn

# uvicorn uses uvloop automatically when available
pip install uvicorn[standard] # installs uvloop as a dependency
uvicorn myapp:app --loop uvloop

Benchmark: asyncio vs uvloop

import asyncio
import uvloop
import time

async def echo_many():
"""Simulate many small I/O operations."""
for _ in range(100_000):
await asyncio.sleep(0) # yield control 100k times

# Standard asyncio
loop = asyncio.new_event_loop()
start = time.perf_counter()
loop.run_until_complete(echo_many())
asyncio_time = time.perf_counter() - start
loop.close()
print(f"asyncio: {asyncio_time:.3f}s")

# uvloop
loop = uvloop.new_event_loop()
start = time.perf_counter()
loop.run_until_complete(echo_many())
uvloop_time = time.perf_counter() - start
loop.close()
print(f"uvloop: {uvloop_time:.3f}s")
print(f"Speedup: {asyncio_time / uvloop_time:.1f}x")
asyncio: 1.847s
uvloop: 0.612s
Speedup: 3.0x

uvloop is typically 2–4x faster for I/O-heavy workloads. The speedup comes from libuv's efficient I/O multiplexing and tighter callback dispatch. For services with thousands of concurrent connections, this translates directly to higher throughput.

Section 3: Connection Pooling

Connection pools maintain a set of pre-established connections to a server. Instead of creating a new TCP connection per request (expensive), a connection from the pool is borrowed, used, and returned. This is the single highest-leverage async optimisation for most production services.

HTTP Client: httpx.AsyncClient

import httpx
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from typing import Optional

# Module-level shared client - one instance for the entire application
_http_client: Optional[httpx.AsyncClient] = None


def get_http_client() -> httpx.AsyncClient:
"""FastAPI dependency that returns the shared HTTP client."""
if _http_client is None:
raise RuntimeError("HTTP client not initialised - check lifespan setup")
return _http_client


@asynccontextmanager
async def lifespan(app: FastAPI):
global _http_client

_http_client = httpx.AsyncClient(
timeout=httpx.Timeout(
connect=2.0, # TCP connection timeout
read=10.0, # Response read timeout
write=5.0, # Request write timeout
pool=2.0, # Pool acquisition timeout
),
limits=httpx.Limits(
max_connections=200, # total concurrent connections
max_keepalive_connections=50, # persistent connections to keep alive
keepalive_expiry=30.0, # how long to keep idle connections
),
headers={
"User-Agent": "MyService/1.0",
"Accept-Encoding": "gzip, deflate",
},
)
yield
await _http_client.aclose()


app = FastAPI(lifespan=lifespan)

Database Connection Pool: SQLAlchemy Async

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from fastapi import Depends

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"

engine = create_async_engine(
DATABASE_URL,
pool_size=20, # maintain 20 persistent connections
max_overflow=10, # allow up to 10 extra connections under load
pool_timeout=30, # wait up to 30s for a connection from the pool
pool_recycle=3600, # recycle connections after 1 hour (avoid stale)
pool_pre_ping=True, # test connection health before use
echo=False,
)

AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)

async def get_db() -> AsyncSession:
"""FastAPI dependency for a database session."""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise

The Pool Exhaustion Trap

Pool exhaustion: all connections in the pool are in use and a new request must wait. If the wait exceeds pool_timeout, an exception is raised.

import asyncio
import httpx

async def demonstrate_pool_exhaustion():
"""Shows what happens when pool_size is too small."""
client = httpx.AsyncClient(
limits=httpx.Limits(
max_connections=5, # pool of 5
max_keepalive_connections=5,
),
timeout=httpx.Timeout(pool=1.0), # 1s to get a connection
)

async def fetch(i):
# Each request takes 2 seconds - with 5-connection pool and
# 10 concurrent requests, 5 will wait and hit the timeout
await asyncio.sleep(2.0) # simulate slow response
return i

tasks = [fetch(i) for i in range(10)]
try:
results = await asyncio.gather(*tasks)
except httpx.PoolTimeout as e:
print(f"Pool exhausted: {e}")

await client.aclose()

Diagnosing pool exhaustion: pool exhaustion manifests as PoolTimeout exceptions that spike during high load. To detect it before it causes errors, expose pool metrics:

# httpx pool statistics (approximate)
def log_pool_stats(client: httpx.AsyncClient) -> None:
pool = client._transport._pool
print(f"Connections: {len(pool._connections)}/{pool._max_connections}")
print(f"Idle: {sum(1 for c in pool._connections if c.is_idle)}")

# SQLAlchemy pool statistics
def log_db_pool_stats() -> None:
pool = engine.pool
print(f"Pool size: {pool.size()}")
print(f"Checked out: {pool.checkedout()}")
print(f"Overflow: {pool.overflow()}")
print(f"Invalid: {pool.invalidated()}")

Section 4: Semaphore for Rate Limiting

Even with a large connection pool, you may need to cap concurrent outbound requests to prevent overwhelming a downstream service (or to respect rate limits).

import asyncio
import httpx
from typing import Any

async def fetch_with_semaphore(
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
url: str,
) -> dict:
"""Fetch a URL while holding a semaphore slot."""
async with semaphore:
response = await client.get(url)
response.raise_for_status()
return response.json()


async def batch_fetch(
urls: list[str],
max_concurrent: int = 20,
) -> list[dict]:
"""
Fetch many URLs concurrently, but cap concurrent connections at max_concurrent.
Without the semaphore, all 1000 requests would launch simultaneously,
potentially overwhelming the target service.
"""
semaphore = asyncio.Semaphore(max_concurrent)

async with httpx.AsyncClient() as client:
tasks = [
fetch_with_semaphore(client, semaphore, url)
for url in urls
]
return await asyncio.gather(*tasks, return_exceptions=True)


# Usage: fetch 1000 URLs with at most 20 concurrent connections
urls = [f"https://api.example.com/item/{i}" for i in range(1000)]
results = asyncio.run(batch_fetch(urls, max_concurrent=20))

Adaptive Rate Limiting

For APIs with per-second rate limits (e.g., "100 requests per second"):

import asyncio
import time
from collections import deque

class RateLimiter:
"""
Token bucket rate limiter for async code.
Allows up to `rate` requests per `period` seconds.
"""

def __init__(self, rate: int, period: float = 1.0):
self.rate = rate
self.period = period
self._semaphore = asyncio.Semaphore(rate)
self._request_times: deque = deque()
self._lock = asyncio.Lock()

async def acquire(self) -> None:
"""Wait until a request slot is available."""
async with self._lock:
now = time.monotonic()

# Remove request timestamps older than the period
while self._request_times and self._request_times[0] < now - self.period:
self._request_times.popleft()

if len(self._request_times) >= self.rate:
# Wait until the oldest request falls outside the window
sleep_for = self._request_times[0] + self.period - now
await asyncio.sleep(sleep_for)

self._request_times.append(time.monotonic())

async def __aenter__(self):
await self.acquire()
return self

async def __aexit__(self, *args):
pass


# Usage
limiter = RateLimiter(rate=100, period=1.0) # 100 req/s

async def rate_limited_fetch(client, url):
async with limiter:
return await client.get(url)

Section 5: asyncio.gather vs asyncio.TaskGroup vs Sequential

Three patterns for running multiple coroutines. Each has different semantics for error handling.

Sequential (Baseline)

import asyncio
import time

async def fetch_weather(city: str) -> dict:
await asyncio.sleep(1.0) # simulates 1-second I/O
return {"city": city, "temp": 22.0}

async def get_dashboard_sequential(cities: list[str]) -> list[dict]:
"""Sequential: 10 cities × 1s = 10 seconds total."""
results = []
for city in cities:
result = await fetch_weather(city)
results.append(result)
return results

cities = ["London", "Paris", "Tokyo", "NYC", "Sydney",
"Mumbai", "Berlin", "Cairo", "Lagos", "Seoul"]

start = time.perf_counter()
results = asyncio.run(get_dashboard_sequential(cities))
print(f"Sequential: {time.perf_counter() - start:.2f}s") # 10.02s

asyncio.gather - Concurrent, One Error Cancels All

async def get_dashboard_gather(cities: list[str]) -> list[dict]:
"""Concurrent: 10 cities, all run simultaneously = ~1 second total."""
tasks = [fetch_weather(city) for city in cities]
return await asyncio.gather(*tasks)

start = time.perf_counter()
results = asyncio.run(get_dashboard_gather(cities))
print(f"gather: {time.perf_counter() - start:.2f}s") # 1.01s

Error handling with asyncio.gather:

# return_exceptions=False (default): first exception cancels nothing,
# but the exception propagates and gather raises it
# return_exceptions=True: exceptions are returned as values, not raised

async def get_dashboard_resilient(cities: list[str]) -> list[dict | Exception]:
tasks = [fetch_weather(city) for city in cities]
results = await asyncio.gather(*tasks, return_exceptions=True)

successes = []
for city, result in zip(cities, results):
if isinstance(result, Exception):
print(f"Failed to fetch {city}: {result}")
else:
successes.append(result)
return successes

asyncio.TaskGroup - Structured Concurrency (Python 3.11+)

import asyncio

async def get_dashboard_taskgroup(cities: list[str]) -> list[dict]:
"""
TaskGroup: if ANY task raises, ALL sibling tasks are cancelled.
This is structured concurrency - tasks cannot outlive their scope.
"""
results = []

async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_weather(city)) for city in cities]

# All tasks completed (or one raised → all cancelled)
return [task.result() for task in tasks]

The difference from gather:

BehaviourgatherTaskGroup
Exception from one taskReturns/raises, others continueCancels all siblings
Tasks can outlive the scopeYes (if not awaited)No - they are cancelled on exit
Timeout per taskManual with asyncio.wait_forManual with asyncio.wait_for
Python versionAllPython 3.11+
Preferred for structured codeLess structuredYes - recommended modern approach

Timing Comparison

import asyncio
import time

async def slow_io(duration: float, name: str) -> str:
await asyncio.sleep(duration)
return f"Done: {name}"

async def benchmark():
tasks_args = [(1.0, f"task-{i}") for i in range(10)]

# Sequential
start = time.perf_counter()
for duration, name in tasks_args:
await slow_io(duration, name)
print(f"Sequential: {time.perf_counter() - start:.2f}s")

# gather
start = time.perf_counter()
await asyncio.gather(*[slow_io(d, n) for d, n in tasks_args])
print(f"gather: {time.perf_counter() - start:.2f}s")

# TaskGroup
start = time.perf_counter()
async with asyncio.TaskGroup() as tg:
[tg.create_task(slow_io(d, n)) for d, n in tasks_args]
print(f"TaskGroup: {time.perf_counter() - start:.2f}s")

asyncio.run(benchmark())
Sequential: 10.01s
gather: 1.01s
TaskGroup: 1.01s

Both concurrent approaches have identical performance characteristics. TaskGroup is preferred for correctness (structured concurrency) in new code.

Section 6: Backpressure with asyncio.Queue

Without backpressure, a fast producer and slow consumer leads to unbounded queue growth - eventually OOM. asyncio.Queue with maxsize implements backpressure: the producer blocks (awaits) when the queue is full.

The Unbounded Queue Problem

import asyncio

# BAD: unbounded queue - producer runs at full speed
async def bad_pipeline():
queue = asyncio.Queue() # no maxsize - unlimited growth

async def producer():
for i in range(1_000_000):
await queue.put(i) # never blocks
# Problem: if consumer is slow, queue fills memory

async def consumer():
while True:
item = await queue.get()
await asyncio.sleep(0.001) # slow consumer
queue.task_done()

await asyncio.gather(producer(), consumer())

Bounded Queue with Backpressure

import asyncio
import time
from typing import AsyncIterator

async def producer(
queue: asyncio.Queue,
items: list,
) -> None:
"""
Produce items into a bounded queue.
If the queue is full (maxsize reached), await queue.put() blocks
until the consumer has processed something - automatic backpressure.
"""
for item in items:
await queue.put(item) # blocks if queue is full
await queue.put(None) # sentinel to signal completion


async def consumer(
queue: asyncio.Queue,
n_workers: int = 4,
) -> list:
"""
Consume items from the queue with N parallel workers.
Each worker processes one item at a time.
"""
results = []

async def worker():
while True:
item = await queue.get()
if item is None:
queue.task_done()
await queue.put(None) # re-put sentinel for other workers
break
try:
result = await process_item(item)
results.append(result)
finally:
queue.task_done()

workers = [asyncio.create_task(worker()) for _ in range(n_workers)]
await asyncio.gather(*workers)
return results


async def bounded_pipeline(items: list, max_queue_size: int = 100) -> list:
"""
Bounded producer-consumer pipeline.
max_queue_size controls peak memory usage.
Producer will stall if consumer falls behind.
"""
queue = asyncio.Queue(maxsize=max_queue_size)

producer_task = asyncio.create_task(producer(queue, items))
consumer_result = await consumer(queue, n_workers=8)
await producer_task

return consumer_result

Queue as a Retry Buffer

import asyncio
from dataclasses import dataclass
from typing import Any

@dataclass
class WorkItem:
payload: Any
attempts: int = 0
max_attempts: int = 3

async def reliable_processor(items: list[Any]) -> None:
"""
Process items with automatic retry on failure.
Failed items are re-queued with a delay (exponential backoff).
"""
queue: asyncio.Queue[WorkItem | None] = asyncio.Queue(maxsize=500)

async def producer():
for item in items:
await queue.put(WorkItem(payload=item))
await queue.put(None) # sentinel

async def worker():
while True:
work_item = await queue.get()
if work_item is None:
queue.task_done()
break

try:
await process_item(work_item.payload)
queue.task_done()
except Exception as e:
work_item.attempts += 1
if work_item.attempts < work_item.max_attempts:
# Re-queue with backoff
delay = 2 ** work_item.attempts
await asyncio.sleep(delay)
await queue.put(work_item)
else:
print(f"Giving up on {work_item.payload} after {work_item.attempts} attempts")
queue.task_done()

await asyncio.gather(producer(), worker(), worker(), worker(), worker())

Section 7: CPU-Bound Work in Async

Async does not help with CPU-bound work - running a CPU-intensive function in a coroutine blocks the event loop for its entire duration. The right tools are ThreadPoolExecutor (for I/O-bound blocking code) and ProcessPoolExecutor (for CPU-bound code).

loop.run_in_executor - Offloading Blocking Work

import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import functools

# Shared executors - create once, reuse
_thread_pool = ThreadPoolExecutor(max_workers=20)
_process_pool = ProcessPoolExecutor(max_workers=8)


async def run_in_thread(func, *args, **kwargs):
"""
Run a blocking I/O function in a thread pool.
The event loop continues processing other coroutines while this runs.

Use for: legacy sync code, blocking database drivers, file I/O,
third-party clients without async support.
"""
loop = asyncio.get_running_loop()
# functools.partial is needed to pass keyword arguments
if kwargs:
func = functools.partial(func, **kwargs)
return await loop.run_in_executor(_thread_pool, func, *args)


async def run_in_process(func, *args, **kwargs):
"""
Run a CPU-bound function in a process pool.
Bypasses the GIL - true parallelism for CPU-intensive work.

Use for: image processing, ML inference, large data transformation,
cryptographic operations.
"""
loop = asyncio.get_running_loop()
if kwargs:
func = functools.partial(func, **kwargs)
return await loop.run_in_executor(_process_pool, func, *args)


# Examples
import time
import requests # synchronous HTTP library (for illustration)

def blocking_http_call(url: str) -> dict:
"""Synchronous HTTP call - would block the event loop if called directly."""
response = requests.get(url, timeout=10)
return response.json()


def cpu_intensive_computation(data: list[float]) -> float:
"""CPU-bound work - needs ProcessPoolExecutor."""
return sum(x ** 2.0 for x in data) ** 0.5


@app.get("/data")
async def get_data():
# Blocking HTTP - run in thread pool
external_data = await run_in_thread(blocking_http_call, "http://slow-api.internal/data")

# CPU-heavy - run in process pool
score = await run_in_process(cpu_intensive_computation, external_data['values'])

return {"score": score}

asyncio.to_thread() - Python 3.9+

Simpler API for thread pool offloading:

import asyncio

def slow_sync_operation(data: bytes) -> dict:
"""Any blocking synchronous function."""
import json
return json.loads(data.decode())

async def handler():
raw = b'{"key": "value", "count": 42}'
# asyncio.to_thread handles the executor boilerplate
result = await asyncio.to_thread(slow_sync_operation, raw)
return result

Choosing the Right Executor

Work TypeExecutorWhy
Blocking I/O (sync HTTP, files)ThreadPoolExecutorReleases GIL during I/O - threads work
Legacy sync DB driverThreadPoolExecutorSame - I/O bound
CPU-heavy Python computationProcessPoolExecutorBypasses GIL via separate processes
NumPy / scipy operationsThreadPoolExecutorNumPy releases GIL internally
Image/video processing (OpenCV)ProcessPoolExecutorCPU-bound, benefits from process parallelism
ML model inference (CPU)ProcessPoolExecutorCPU-bound
ML model inference (GPU)ThreadPoolExecutorGPU ops release GIL; process overhead wasteful

Section 8: Profiling Async Code

Standard profiling tools (cProfile, py-spy) work with async code but produce confusing output because coroutine frames are stored in heap objects, not on the C call stack. Dedicated async profiling tools exist.

asyncio.all_tasks() - Inspecting Running Tasks

import asyncio

async def diagnose_stuck_tasks() -> None:
"""
Find tasks that have been running for too long without awaiting.
A healthy async service has short coroutine runs between awaits.
"""
tasks = asyncio.all_tasks()
for task in tasks:
if task.done():
continue

# Get the coroutine's current call stack
stack = task.get_stack()
if not stack:
continue

frame = stack[-1] # innermost frame
print(
f"Task: {task.get_name()}\n"
f" at: {frame.f_code.co_filename}:{frame.f_lineno}\n"
f" in: {frame.f_code.co_name}\n"
)

# Schedule periodic task diagnosis
async def watchdog(interval: float = 10.0) -> None:
while True:
await asyncio.sleep(interval)
await diagnose_stuck_tasks()

aiomonitor - Live Async Monitor

pip install aiomonitor
import asyncio
import aiomonitor

async def main():
# ... your application code ...
pass

if __name__ == "__main__":
with aiomonitor.start_monitor(loop=asyncio.get_event_loop()):
asyncio.run(main())

# Connect to the monitor:
# nc localhost 50101
# Available commands: ps, cancel, where <id>, stacktrace

aiomonitor attaches a telnet server to the running event loop. You can list tasks, inspect their stacks, and cancel them without stopping the service.

Finding a Task That Never Yields

A common bug: a coroutine does CPU-heavy work in a loop without awaiting, starving all other coroutines.

import asyncio
import time

# BAD: CPU-bound loop without awaiting - blocks everything for 30 seconds
async def malicious_coroutine():
start = time.time()
result = 0
while time.time() - start < 30:
result += sum(x**2 for x in range(10_000)) # CPU work, no await
return result

# Detecting it:
async def detect_slow_callbacks():
"""
asyncio.get_event_loop().slow_callback_duration sets the threshold
for logging warnings about slow callbacks.
"""
loop = asyncio.get_running_loop()
loop.slow_callback_duration = 0.1 # warn if any callback takes > 100ms
# asyncio will log warnings like:
# "Executing <Task ... coro=<malicious_coroutine()>> took 30.2 seconds"

pyinstrument for Async Profiles

from pyinstrument import Profiler
import asyncio

profiler = Profiler(async_mode='enabled')

async def main():
with profiler:
await run_application()

asyncio.run(main())
profiler.open_in_browser()

pyinstrument with async_mode='enabled' correctly handles async code - it profiles across await boundaries and shows the time tasks spend waiting vs. running.

Production Patterns Reference

Startup and Shutdown Checklist

from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
import httpx
from sqlalchemy.ext.asyncio import create_async_engine

@asynccontextmanager
async def lifespan(app: FastAPI):
"""Production-grade lifespan: initialise all shared resources."""

# 1. HTTP client pool
app.state.http_client = httpx.AsyncClient(
limits=httpx.Limits(
max_connections=200,
max_keepalive_connections=50,
),
timeout=httpx.Timeout(connect=2.0, read=10.0),
)

# 2. Database engine
app.state.db_engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
)

# 3. Semaphore for external API rate limiting
app.state.api_semaphore = asyncio.Semaphore(50)

# 4. Background tasks queue (bounded)
app.state.work_queue = asyncio.Queue(maxsize=500)

yield

# Shutdown - reverse order
await app.state.http_client.aclose()
await app.state.db_engine.dispose()

Interview Questions

Q1: The asyncio event loop is single-threaded. How does it handle 1000 concurrent HTTP requests without running out of threads?

The event loop does not use a thread per connection. Instead, it uses OS-level I/O multiplexing: epoll on Linux, kqueue on macOS/BSD, IOCP on Windows. A single epoll_wait() call can monitor thousands of file descriptors simultaneously, returning immediately with the list of descriptors that are ready (have data to read or can accept writes).

The flow for 1000 concurrent HTTP requests:

  1. 1000 sockets are opened and registered with epoll
  2. epoll_wait() is called with all 1000 file descriptors
  3. When data arrives on any socket, epoll_wait() returns a list of ready fds
  4. The event loop executes the callbacks for the ready sockets (calling recv() on each)
  5. Completed callbacks resume the waiting coroutines (await response)
  6. epoll_wait() is called again with the remaining waiting sockets

The key: blocking on epoll_wait() is cheap (one system call waits on all fds). There are no threads allocated per connection - the event loop thread serves all connections via callbacks. This is why asyncio can handle tens of thousands of concurrent connections with minimal memory overhead (~few KB per task vs ~2MB stack per thread).

Q2: What is connection pool exhaustion and how would you diagnose and fix it in a production FastAPI service?

Pool exhaustion occurs when all connections in the pool are checked out simultaneously and a new request must wait for one to be returned. If the wait exceeds pool_timeout, a PoolTimeout exception is raised - the request fails.

Diagnosis:

  • Error logs: PoolTimeout or sqlalchemy.exc.TimeoutError spike during load
  • Monitor pool metrics: pool.checkedout() consistently equals pool_size + max_overflow
  • p99 latency increases dramatically under load (pool wait time is included in request latency)

Root causes and fixes:

  1. pool_size is too small: increase it. Rule of thumb: pool_size = (number of async workers) × (average concurrent DB queries per request).

  2. Connections not being returned: a coroutine that raises an exception inside a with session: block without proper error handling can leave a connection checked out. Fix by ensuring all session code uses a try/finally or async with session: context manager.

  3. Slow queries holding connections: a query that takes 30 seconds holds a pool connection for 30 seconds. Diagnose with query timing logs. Fix: query optimisation, adding indexes, or query timeouts.

  4. max_overflow too low: max_overflow allows temporary extra connections during peaks. If your traffic has sharp spikes, increasing max_overflow buys time for the spike to subside.

  5. Upstream service is slow: if the downstream database is the bottleneck, adding more connections is a partial fix. Address the root cause.

Q3: When should you use asyncio.gather vs asyncio.TaskGroup? Is there a performance difference?

There is no meaningful performance difference - both schedule tasks on the same event loop and both run them concurrently. The difference is in error handling semantics and code structure.

asyncio.gather with return_exceptions=False (default) propagates the first exception raised, but does not cancel the other tasks. Tasks that were started continue running even after the exception propagates to the caller. This can cause background tasks to run after their results are no longer needed - wasting resources and potentially causing confusing side effects.

asyncio.TaskGroup (Python 3.11+) implements structured concurrency: if any task raises an exception, all sibling tasks are immediately cancelled. The async with TaskGroup() as tg: block does not exit until all tasks are complete (or cancelled). No task can outlive its scope.

Use asyncio.TaskGroup when:

  • You want "all succeed or all fail" semantics
  • You want strong guarantees that tasks do not outlive their scope
  • You are writing new code targeting Python 3.11+

Use asyncio.gather when:

  • You need return_exceptions=True (collect all results including exceptions)
  • You need compatibility with Python < 3.11
  • You want tasks to continue even if one fails (some dashboards, batch collectors)

Q4: Describe the correct way to run a CPU-bound function from an async handler. What goes wrong if you call it directly in the coroutine?

Calling a CPU-bound function directly in a coroutine blocks the event loop thread for the function's entire duration. During this time, the event loop cannot process any other coroutines - no other requests are handled, no I/O callbacks fire, and asyncio.sleep() calls in other tasks do not resume. The service is effectively frozen.

The correct approach for CPU-bound work is loop.run_in_executor() with a ProcessPoolExecutor:

async def handler(data: list[float]) -> float:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
process_pool, # ProcessPoolExecutor - bypasses the GIL
cpu_heavy_function, # the function to run
data, # its argument
)
return result

run_in_executor runs the function in a subprocess (ProcessPoolExecutor) or thread (ThreadPoolExecutor), while the event loop continues serving other requests. When the work completes, the event loop is notified and the coroutine resumes.

Important distinction: ThreadPoolExecutor is appropriate for blocking I/O-bound code (legacy synchronous HTTP clients, blocking database drivers) but does NOT bypass the GIL for CPU-bound Python code. For CPU-intensive Python, use ProcessPoolExecutor. For CPU-intensive NumPy/Cython/Numba code (which releases the GIL internally), ThreadPoolExecutor is sufficient and has lower overhead than spawning processes.

Q5: You are building a service that calls 50 external APIs per request. With asyncio.gather, all 50 calls start simultaneously. What are the risks and how would you control them?

Risk 1 - Overwhelming the downstream services: 1000 req/s × 50 concurrent calls = 50,000 outbound connections simultaneously. Most external APIs have rate limits and connection caps. Exceeding these causes 429 (rate limit) or connection refused errors.

Risk 2 - Pool exhaustion: your connection pool has a fixed number of connections. 1000 req/s × 50 simultaneous calls each requires a pool of 50,000 to avoid queuing - not realistic.

Risk 3 - Memory spikes: 50,000 in-flight responses, each holding a response buffer in memory, can cause OOM under load.

Controls:

  1. asyncio.Semaphore - cap concurrent calls to each service. semaphore = asyncio.Semaphore(10) limits to 10 concurrent calls to a given API.

  2. Staggered fanout - do not fire all 50 calls simultaneously. Process them in batches of 10 with a small delay between batches.

  3. Timeouts per call - asyncio.wait_for(fetch(), timeout=2.0) ensures slow upstream APIs do not hold connections indefinitely.

  4. Circuit breaker - if a service returns errors for 5 consecutive calls, stop sending to it for 30 seconds. Libraries like aiobreaker or circuitbreaker implement this pattern.

  5. Correct pool sizing - set max_connections in the HTTP client to match your concurrency target. With Semaphore(10) and 1000 req/s, the peak concurrent calls to any one service is 10 - set max_keepalive_connections to something reasonable like 20.

© 2026 EngineersOfAI. All rights reserved.